1   /**
2    * Copyright 2014 Netflix, Inc.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    * http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package rx.internal.util;
17  
18  import java.util.concurrent.atomic.AtomicInteger;
19  import java.util.concurrent.atomic.AtomicIntegerArray;
20  import java.util.concurrent.atomic.AtomicReference;
21  import java.util.concurrent.atomic.AtomicReferenceArray;
22  
23  import rx.Subscription;
24  import rx.functions.Func1;
25  
26  /**
27   * Add/Remove without object allocation (after initial construction).
28   * <p>
29   * This is meant for hundreds or single-digit thousands of elements that need
30   * to be rapidly added and randomly or sequentially removed while avoiding object allocation.
31   * <p>
32   * On Intel Core i7, 2.3Mhz, Mac Java 8:
33   * <p>
34   * - adds per second single-threaded => ~32,598,500 for 100
35   * - adds per second single-threaded => ~23,200,000 for 10,000
36   * - adds + removes per second single-threaded => 15,562,100 for 100
37   * - adds + removes per second single-threaded => 8,760,000 for 10,000
38   * 
39   * <pre> {@code
40   * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
41   * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   263571.721     9856.994    ops/s
42   * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1763.417      211.998    ops/s
43   * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   139850.115    17143.705    ops/s
44   * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      809.982       72.931    ops/s
45   * } </pre>
46   * 
47   * @param <E>
48   */
49  public final class IndexedRingBuffer<E> implements Subscription {
50  
51      private static final ObjectPool<IndexedRingBuffer<?>> POOL = new ObjectPool<IndexedRingBuffer<?>>() {
52  
53          @Override
54          protected IndexedRingBuffer<?> createObject() {
55              return new IndexedRingBuffer<Object>();
56          }
57  
58      };
59  
60      @SuppressWarnings("unchecked")
61      public final static <T> IndexedRingBuffer<T> getInstance() {
62          return (IndexedRingBuffer<T>) POOL.borrowObject();
63      }
64  
65      private final ElementSection<E> elements = new ElementSection<E>();
66      private final IndexSection removed = new IndexSection();
67      /* package for unit testing */final AtomicInteger index = new AtomicInteger();
68      /* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();
69      
70      // default size of ring buffer
71      /**
72       * Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers.
73       * <p>
74       * The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example) 
75       * 
76       * <pre> {@code
77       * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*'
78       * 
79       * 1024
80       * 
81       * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
82       * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   269292.006     6013.347    ops/s
83       * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     2217.103      163.396    ops/s
84       * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   139349.608     9397.232    ops/s
85       * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5     1045.323       30.991    ops/s
86       * 
87       * 512
88       * 
89       * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
90       * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   270919.870     5381.793    ops/s
91       * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1724.436       42.287    ops/s
92       * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   141478.813     3696.030    ops/s
93       * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      719.447       75.629    ops/s
94       * 
95       * 
96       * 256
97       * 
98       * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
99       * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   272042.605     7954.982    ops/s
100      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1101.329       23.566    ops/s
101      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   140479.804     6389.060    ops/s
102      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      397.306       24.222    ops/s
103      * 
104      * 128
105      * 
106      * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
107      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   263065.312    11168.941    ops/s
108      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      581.708       17.397    ops/s
109      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   138051.488     4618.935    ops/s
110      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      176.873       35.669    ops/s
111      * 
112      * 32
113      * 
114      * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
115      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   250737.473    17260.148    ops/s
116      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      144.725       26.284    ops/s
117      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   118832.832     9082.658    ops/s
118      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5       32.133        8.048    ops/s
119      * 
120      * 8
121      * 
122      * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
123      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   209192.847    25558.124    ops/s
124      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5       26.520        3.100    ops/s
125      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   100200.463     1854.259    ops/s
126      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        8.456        2.114    ops/s
127      * 
128      * 2
129      * 
130      * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
131      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5    96549.208     4427.239    ops/s
132      * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5        6.637        2.025    ops/s
133      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5    34553.169     4904.197    ops/s
134      * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        2.159        0.700    ops/s
135      * } </pre>
136      * 
137      * Impact of IndexedRingBuffer size on merge
138      * 
139      * <pre> {@code
140      * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
141      * 
142      * 512
143      * 
144      * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
145      * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5282500.038   530541.761    ops/s
146      * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    49327.272     6382.189    ops/s
147      * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.025        4.724    ops/s
148      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97395.148     2489.303    ops/s
149      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.723        1.479    ops/s
150      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4534067.250   116321.725    ops/s
151      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458561.098    27652.081    ops/s
152      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    43267.381     2648.107    ops/s
153      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5581051.672   144191.849    ops/s
154      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.643        4.354    ops/s
155      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76437.644      959.748    ops/s
156      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2965.306      272.928    ops/s
157      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5026522.098   364196.255    ops/s
158      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34926.819      938.612    ops/s
159      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       33.342        1.701    ops/s
160      * 
161      * 
162      * 128
163      * 
164      * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
165      * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5144891.776   271990.561    ops/s
166      * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    53580.161     2370.204    ops/s
167      * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.265        2.236    ops/s
168      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96634.426     1417.430    ops/s
169      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.648        0.255    ops/s
170      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4601280.220    53157.938    ops/s
171      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   463394.568    58612.882    ops/s
172      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50503.565     2394.168    ops/s
173      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5490315.842   228654.817    ops/s
174      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.661        3.385    ops/s
175      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74716.169     7413.642    ops/s
176      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3009.476      277.075    ops/s
177      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4953313.642   307512.126    ops/s
178      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35335.579     2368.377    ops/s
179      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       37.450        0.655    ops/s
180      * 
181      * 32
182      * 
183      * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
184      * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4975957.497   365423.694    ops/s
185      * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    52141.226     5056.658    ops/s
186      * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.663        2.671    ops/s
187      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96507.893     1833.371    ops/s
188      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.850        0.782    ops/s
189      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4557128.302   118516.934    ops/s
190      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   339005.037    10594.737    ops/s
191      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50781.535     6071.787    ops/s
192      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5604920.068   209285.840    ops/s
193      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.413        7.496    ops/s
194      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76098.942      558.187    ops/s
195      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2988.137      193.255    ops/s
196      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5177255.256   150253.086    ops/s
197      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34772.490      909.967    ops/s
198      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.847        0.606    ops/s
199      * 
200      * 8
201      * 
202      * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
203      * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5027331.903   337986.410    ops/s
204      * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51746.540     3585.450    ops/s
205      * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.682        4.026    ops/s
206      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96805.587     2868.112    ops/s
207      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.598        0.290    ops/s
208      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4390912.630   300687.310    ops/s
209      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458615.731    56125.958    ops/s
210      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    49033.105     6132.936    ops/s
211      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5090614.100   649439.778    ops/s
212      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       48.548        3.586    ops/s
213      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    72285.482    16820.952    ops/s
214      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2981.576      316.140    ops/s
215      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4993609.293   267975.397    ops/s
216      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    33228.972     1554.924    ops/s
217      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       32.994        3.615    ops/s
218      * 
219      * 
220      * 2
221      * 
222      * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
223      * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5103812.234   939461.192    ops/s
224      * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51491.116     3790.056    ops/s
225      * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.043        2.340    ops/s
226      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96575.834    13416.541    ops/s
227      * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.740        0.047    ops/s
228      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4435909.832   899133.671    ops/s
229      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   392382.445    59814.783    ops/s
230      * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50429.258     7489.849    ops/s
231      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5637321.803   161838.195    ops/s
232      * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       51.065        2.138    ops/s
233      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76366.764     2631.710    ops/s
234      * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2978.302      296.418    ops/s
235      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5280829.290  1602542.493    ops/s
236      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35070.518     3565.672    ops/s
237      * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.501        0.991    ops/s
238      * 
239      * } </pre>
240      */
241     static int _size = 256;
242     static {
243         // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
244         if (PlatformDependent.isAndroid()) {
245             _size = 8;
246         }
247 
248         // possible system property for overriding
249         String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer
250         if (sizeFromProperty != null) {
251             try {
252                 _size = Integer.parseInt(sizeFromProperty);
253             } catch (Exception e) {
254                 System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage());
255             }
256         }
257     }
258     
259     /* package for unit testing */static final int SIZE = _size;
260 
261     /**
262      * This resets the arrays, nulls out references and returns it to the pool.
263      * This extra CPU cost is far smaller than the object allocation cost of not pooling.
264      */
265     public void releaseToPool() {
266         // need to clear all elements so we don't leak memory
267         int maxIndex = index.get();
268         int realIndex = 0;
269         ElementSection<E> section = elements;
270         outer: while (section != null) {
271             for (int i = 0; i < SIZE; i++, realIndex++) {
272                 if (realIndex >= maxIndex) {
273                     section = null;
274                     break outer;
275                 }
276                 // we can use lazySet here because we are nulling things out and not accessing them again
277                 // (relative on Mac Intel i7) lazySet gets us ~30m vs ~26m ops/second in the JMH test (100 adds per release)
278                 section.array.set(i, null);
279             }
280             section = section.next.get();
281         }
282 
283         index.set(0);
284         removedIndex.set(0);
285         POOL.returnObject(this);
286     }
287 
288     @Override
289     public void unsubscribe() {
290         releaseToPool();
291     }
292 
293     private IndexedRingBuffer() {
294     }
295 
296     /**
297      * Add an element and return the index where it was added to allow removal.
298      * 
299      * @param e
300      * @return
301      */
302     public int add(E e) {
303         int i = getIndexForAdd();
304         if (i < SIZE) {
305             // fast-path when we are in the first section
306             elements.array.set(i, e);
307             return i;
308         } else {
309             int sectionIndex = i % SIZE;
310             getElementSection(i).array.set(sectionIndex, e);
311             return i;
312         }
313     }
314 
315     public E remove(int index) {
316         E e;
317         if (index < SIZE) {
318             // fast-path when we are in the first section
319             e = elements.array.getAndSet(index, null);
320         } else {
321             int sectionIndex = index % SIZE;
322             e = getElementSection(index).array.getAndSet(sectionIndex, null);
323         }
324         pushRemovedIndex(index);
325         return e;
326     }
327 
328     private IndexSection getIndexSection(int index) {
329         // short-cut the normal case
330         if (index < SIZE) {
331             return removed;
332         }
333 
334         // if we have passed the first array we get more complicated and do recursive chaining
335         int numSections = index / SIZE;
336         IndexSection a = removed;
337         for (int i = 0; i < numSections; i++) {
338             a = a.getNext();
339         }
340         return a;
341     }
342 
343     private ElementSection<E> getElementSection(int index) {
344         // short-cut the normal case
345         if (index < SIZE) {
346             return elements;
347         }
348 
349         // if we have passed the first array we get more complicated and do recursive chaining
350         int numSections = index / SIZE;
351         ElementSection<E> a = elements;
352         for (int i = 0; i < numSections; i++) {
353             a = a.getNext();
354         }
355         return a;
356     }
357 
358     private synchronized int getIndexForAdd() {
359         /*
360          * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
361          */
362         int i;
363         int ri = getIndexFromPreviouslyRemoved();
364         if (ri >= 0) {
365             if (ri < SIZE) {
366                 // fast-path when we are in the first section
367                 i = removed.getAndSet(ri, -1);
368             } else {
369                 int sectionIndex = ri % SIZE;
370                 i = getIndexSection(ri).getAndSet(sectionIndex, -1);
371             }
372             if (i == index.get()) {
373                 // if it was the last index removed, when we pick it up again we want to increment
374                 index.getAndIncrement();
375             }
376         } else {
377             i = index.getAndIncrement();
378         }
379         return i;
380     }
381 
382     /**
383      * Returns -1 if nothing, 0 or greater if the index should be used
384      * 
385      * @return
386      */
387     private synchronized int getIndexFromPreviouslyRemoved() {
388         /*
389          * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
390          */
391 
392         // loop because of CAS
393         while (true) {
394             int currentRi = removedIndex.get();
395             if (currentRi > 0) {
396                 // claim it
397                 if (removedIndex.compareAndSet(currentRi, currentRi - 1)) {
398                     return currentRi - 1;
399                 }
400             } else {
401                 // do nothing
402                 return -1;
403             }
404         }
405     }
406 
407     private synchronized void pushRemovedIndex(int elementIndex) {
408         /*
409          * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
410          */
411 
412         int i = removedIndex.getAndIncrement();
413         if (i < SIZE) {
414             // fast-path when we are in the first section
415             removed.set(i, elementIndex);
416         } else {
417             int sectionIndex = i % SIZE;
418             getIndexSection(i).set(sectionIndex, elementIndex);
419         }
420     }
421 
422     @Override
423     public boolean isUnsubscribed() {
424         return false;
425     }
426 
427     public int forEach(Func1<? super E, Boolean> action) {
428         return forEach(action, 0);
429     }
430 
431     /**
432      * 
433      * @param action
434      *            that processes each item and returns true if it wants to continue to the next
435      * @return int of next index to process, or last index seen if it exited early
436      */
437     public int forEach(Func1<? super E, Boolean> action, int startIndex) {
438         int endedAt = forEach(action, startIndex, index.get());
439         if (startIndex > 0 && endedAt == index.get()) {
440             // start at the beginning again and go up to startIndex
441             endedAt = forEach(action, 0, startIndex);
442         } else if (endedAt == index.get()) {
443             // start back at the beginning
444             endedAt = 0;
445         }
446         return endedAt;
447     }
448 
449     private int forEach(Func1<? super E, Boolean> action, int startIndex, int endIndex) {
450         int lastIndex = startIndex;
451         int maxIndex = index.get();
452         int realIndex = startIndex;
453         ElementSection<E> section = elements;
454 
455         if (startIndex >= SIZE) {
456             // move into the correct section
457             section = getElementSection(startIndex);
458             startIndex = startIndex % SIZE;
459         }
460 
461         outer: while (section != null) {
462             for (int i = startIndex; i < SIZE; i++, realIndex++) {
463                 if (realIndex >= maxIndex || realIndex >= endIndex) {
464                     section = null;
465                     break outer;
466                 }
467                 E element = section.array.get(i);
468                 if (element == null) {
469                     continue;
470                 }
471                 lastIndex = realIndex;
472                 boolean continueLoop = action.call(element);
473                 if (!continueLoop) {
474                     return lastIndex;
475                 }
476             }
477             section = section.next.get();
478             startIndex = 0; // reset to start for next section
479         }
480 
481         // return the OutOfBounds index position if we processed all of them ... the one we should be less-than
482         return realIndex;
483     }
484 
485     private static class ElementSection<E> {
486         private final AtomicReferenceArray<E> array = new AtomicReferenceArray<E>(SIZE);
487         private final AtomicReference<ElementSection<E>> next = new AtomicReference<ElementSection<E>>();
488 
489         ElementSection<E> getNext() {
490             if (next.get() != null) {
491                 return next.get();
492             } else {
493                 ElementSection<E> newSection = new ElementSection<E>();
494                 if (next.compareAndSet(null, newSection)) {
495                     // we won
496                     return newSection;
497                 } else {
498                     // we lost so get the value that won
499                     return next.get();
500                 }
501             }
502         }
503     }
504 
505     private static class IndexSection {
506 
507         private final AtomicIntegerArray unsafeArray = new AtomicIntegerArray(SIZE);
508 
509         public int getAndSet(int expected, int newValue) {
510             return unsafeArray.getAndSet(expected, newValue);
511         }
512 
513         public void set(int i, int elementIndex) {
514             unsafeArray.set(i, elementIndex);
515         }
516 
517         private final AtomicReference<IndexSection> _next = new AtomicReference<IndexSection>();
518 
519         IndexSection getNext() {
520             if (_next.get() != null) {
521                 return _next.get();
522             } else {
523                 IndexSection newSection = new IndexSection();
524                 if (_next.compareAndSet(null, newSection)) {
525                     // we won
526                     return newSection;
527                 } else {
528                     // we lost so get the value that won
529                     return _next.get();
530                 }
531             }
532         }
533 
534     }
535 
536 }